1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2005-2006 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: DefaultConnectionPool.java,v 1.9 2006/12/16 12:37:17 tanderson Exp $
44 */
45 package org.exolab.jms.net.connector;
46
47 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.exolab.jms.common.threads.ThreadFactory;
51 import org.exolab.jms.net.uri.URI;
52 import org.exolab.jms.net.util.Properties;
53
54 import java.security.Principal;
55 import java.util.ArrayList;
56 import java.util.Collections;
57 import java.util.HashMap;
58 import java.util.List;
59 import java.util.Map;
60
61
62 /***
63 * Manages a pool of {@link ManagedConnection} instances, for a particular
64 * {@link ManagedConnectionFactory}.
65 *
66 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67 * @version $Revision: 1.9 $ $Date: 2006/12/16 12:37:17 $
68 * @see AbstractConnectionManager
69 */
70 class DefaultConnectionPool
71 implements ManagedConnectionAcceptorListener,
72 ManagedConnectionListener, ConnectionPool {
73
74 /***
75 * The connection factory.
76 */
77 private final ManagedConnectionFactory _factory;
78
79 /***
80 * Invocation handler to assign to each new connection.
81 */
82 private final InvocationHandler _handler;
83
84 /***
85 * The connection factory for resolving connections via their URI.
86 */
87 private final ConnectionFactory _resolver;
88
89 /***
90 * The set of allocated connections.
91 */
92 private List _connections = Collections.synchronizedList(new ArrayList());
93
94 /***
95 * A map of ManagedConnection -> ManagedConnectionHandle. The handles are
96 * used to reap idle connections.
97 */
98 private Map _handles = Collections.synchronizedMap(new HashMap());
99
100 /***
101 * The set of connection acceptors.
102 */
103 private List _acceptors = Collections.synchronizedList(new ArrayList());
104
105 /***
106 * The set of accepted connections.
107 */
108 private List _accepted = Collections.synchronizedList(new ArrayList());
109
110 /***
111 * The set of all connections, as a map of ManagedConnection -> PoolEntry
112 * instances.
113 */
114 private Map _entries = Collections.synchronizedMap(new HashMap());
115
116 /***
117 * Reap thread synchronization helper.
118 */
119 private final Object _reapLock = new Object();
120
121 /***
122 * Clock daemon for periodically running the reaper.
123 */
124 private ClockDaemon _daemon;
125
126 /***
127 * Interval between pinging and reaping connections, in milliseconds.
128 * If <code>0</code> indicates not to reap connections.
129 */
130 private final long _reapInterval;
131
132 /***
133 * Iterations before a connection that hasn't responded to a ping
134 * is declared dead.
135 */
136 private final int _reapDeadIterations;
137
138 /***
139 * The maximum period that a connection may be idle before it is reaped,
140 * in milliseconds.
141 */
142 private final long _idlePeriod;
143
144 /***
145 * The caller event listener.
146 */
147 private volatile CallerListener _listener;
148
149 /***
150 * Property name prefix for pool configuration items.
151 */
152 private static final String POOL_PREFIX = "org.exolab.jms.net.pool.";
153
154 /***
155 * Configuration property to indicate the no. of reaps to wait before
156 * reaping a connection that hasn't responded to a ping.
157 */
158 private static final String DEAD_ITERATIONS = "reapDeadIterations";
159
160 /***
161 * Configuration property to indicate the reap interval.
162 */
163 private static final String REAP_INTERVAL = "reapInterval";
164
165 /***
166 * Configuration property to indicate the idle time for connections
167 * before they may be reaped.
168 */
169 private static final String IDLE_PERIOD = "idlePeriod";
170
171
172 /***
173 * The logger.
174 */
175 private static final Log _log
176 = LogFactory.getLog(DefaultConnectionPool.class);
177
178
179 /***
180 * Construct a new <code>DefaultConnectionPool</code>.
181 *
182 * @param factory the managed connection factory
183 * @param handler the invocation handler, assigned to each new managed
184 * connection
185 * @param resolver the connection factory for resolving connections via
186 * their URI
187 * @param properties configuration properties. May be <code>null</code>
188 * @throws ResourceException if any configuration property is invalid
189 */
190 public DefaultConnectionPool(ManagedConnectionFactory factory,
191 InvocationHandler handler,
192 ConnectionFactory resolver,
193 Map properties) throws ResourceException {
194 if (factory == null) {
195 throw new IllegalArgumentException("Argument 'factory' is null");
196 }
197 if (handler == null) {
198 throw new IllegalArgumentException("Argument 'handler' is null");
199 }
200 if (resolver == null) {
201 throw new IllegalArgumentException("Argument 'resolver' is null");
202 }
203 _factory = factory;
204 _handler = handler;
205 _resolver = resolver;
206
207 Properties config = new Properties(properties, POOL_PREFIX);
208 _reapInterval = getPropertyMillis(config, REAP_INTERVAL, 60);
209 _reapDeadIterations = config.getInt(DEAD_ITERATIONS, 5);
210 _idlePeriod = getPropertyMillis(config, IDLE_PERIOD, 5);
211 }
212
213 private long getPropertyMillis(Properties config, String key,
214 int defaultValue) throws ResourceException {
215 int seconds = config.getInt(key, defaultValue);
216 if (seconds < 0) {
217 seconds = 0;
218 }
219 return seconds * 1000;
220 }
221
222 /***
223 * Creates a new connection.
224 *
225 * @param principal the security principal
226 * @param info the connection request info
227 * @return a new connection
228 * @throws ResourceException if a connection cannot be established
229 */
230 public ManagedConnection createManagedConnection(Principal principal,
231 ConnectionRequestInfo info)
232 throws ResourceException {
233 ManagedConnection connection = _factory.createManagedConnection(
234 principal, info);
235 return add(connection, false);
236 }
237
238 /***
239 * Creates an acceptor for connections.
240 *
241 * @param authenticator authenticates incoming connections
242 * @param info the connection request info
243 * @return a new connection acceptor
244 * @throws ResourceException if an acceptor cannot be created
245 */
246 public ManagedConnectionAcceptor createManagedConnectionAcceptor(
247 Authenticator authenticator, ConnectionRequestInfo info)
248 throws ResourceException {
249
250 ManagedConnectionAcceptor acceptor;
251
252 acceptor = _factory.createManagedConnectionAcceptor(authenticator,
253 info);
254 _acceptors.add(acceptor);
255 return acceptor;
256 }
257
258 /***
259 * Returns a matched connection from the set of pooled connections.
260 *
261 * @param principal the security principal
262 * @param info the connection request info
263 * @return the first acceptable match, or <code>null</code> if none is
264 * found
265 * @throws ResourceException for any error
266 */
267 public ManagedConnection matchManagedConnections(Principal principal,
268 ConnectionRequestInfo info)
269 throws ResourceException {
270
271 ManagedConnection result;
272 synchronized (_reapLock) {
273
274 result = _factory.matchManagedConnections(_connections, principal,
275 info);
276 if (result != null) {
277
278 result = (ManagedConnection) _handles.get(result);
279 } else {
280 result = _factory.matchManagedConnections(_accepted, principal,
281 info);
282 }
283 }
284 return result;
285 }
286
287 /***
288 * Returns a matched acceptor from the set of pooled connections.
289 *
290 * @param info the connection request info
291 * @return the first acceptable match, or <code>null</code> if none is
292 * found
293 * @throws ResourceException for any error
294 */
295 public ManagedConnectionAcceptor matchManagedConnectionAcceptors(
296 ConnectionRequestInfo info) throws ResourceException {
297
298 return _factory.matchManagedConnectionAcceptors(_acceptors, info);
299 }
300
301 /***
302 * Returns a listener for handling accepted connections.
303 *
304 * @return a listener for handling accepted connections
305 */
306 public ManagedConnectionAcceptorListener
307 getManagedConnectionAcceptorListener() {
308 return this;
309 }
310
311 /***
312 * Invoked when a new connection is accepted.
313 *
314 * @param acceptor the acceptor which created the connection
315 * @param connection the accepted connection
316 */
317 public void accepted(ManagedConnectionAcceptor acceptor,
318 ManagedConnection connection) {
319 try {
320 add(connection, true);
321 } catch (ResourceException exception) {
322 _log.debug("Failed to accept connection", exception);
323 }
324 }
325
326 /***
327 * Notifies closure of a connection. The <code>ManagedConnection</code>
328 * instance invokes this to notify its registered listeners when the peer
329 * closes the connection.
330 *
331 * @param source the managed connection that is the source of the event
332 */
333 public void closed(ManagedConnection source) {
334 if (_log.isDebugEnabled()) {
335 _log.debug("Connection " + source + " closed by peer, destroying");
336 }
337 remove(source);
338 }
339
340 /***
341 * Notifies a connection related error. The <code>ManagedConnection</code>
342 * instance invokes this to notify of the occurrence of a physical
343 * connection-related error.
344 *
345 * @param source the managed connection that is the source of the event
346 * @param throwable the error
347 */
348 public void error(ManagedConnection source, Throwable throwable) {
349 if (_log.isDebugEnabled()) {
350 _log.debug("Error on connection " + source + ", destroying",
351 throwable);
352 }
353 remove(source);
354 }
355
356 /***
357 * Notifies of a successful ping.
358 *
359 * @param source the managed connection that is the source of the event
360 */
361 public void pinged(ManagedConnection source) {
362 ManagedConnectionHandle handle
363 = (ManagedConnectionHandle) _handles.get(source);
364 if (handle != null) {
365 handle.pinged();
366 }
367 }
368
369 /***
370 * Closes this connection pool, cleaning up any allocated resources.
371 *
372 * @throws ResourceException for any error
373 */
374 public void close() throws ResourceException {
375 ManagedConnectionAcceptor[] acceptors =
376 (ManagedConnectionAcceptor[]) _acceptors.toArray(
377 new ManagedConnectionAcceptor[0]);
378 _acceptors.clear();
379
380 for (int i = 0; i < acceptors.length; ++i) {
381 acceptors[i].close();
382 }
383
384 ManagedConnection[] connections =
385 (ManagedConnection[]) _entries.keySet().toArray(
386 new ManagedConnection[0]);
387 for (int i = 0; i < connections.length; ++i) {
388 connections[i].destroy();
389 }
390 _entries.clear();
391
392 _accepted.clear();
393 _connections.clear();
394
395 stopReaper();
396 }
397
398 /***
399 * Invoked when an acceptor receives an error.
400 *
401 * @param acceptor the acceptor which received the error
402 * @param throwable the error
403 */
404 public void error(ManagedConnectionAcceptor acceptor,
405 Throwable throwable) {
406 _acceptors.remove(acceptor);
407
408 String uri = "<unknown>";
409 try {
410 uri = acceptor.getURI().toString();
411 } catch (ResourceException ignore) {
412
413 }
414 _log.error("Failed to accept connections on URI=" + uri,
415 throwable);
416
417 try {
418 acceptor.close();
419 } catch (ResourceException exception) {
420 if (_log.isDebugEnabled()) {
421 _log.debug("Failed to close acceptor, URI=" + uri, exception);
422 }
423 }
424 }
425
426 /***
427 * Sets the listener for caller events.
428 *
429 * @param listener the listener
430 */
431 public void setCallerListener(CallerListener listener) {
432 _listener = listener;
433 }
434
435 /***
436 * Notifies when a managed connection is idle.
437 *
438 * @param connection the idle connection
439 */
440 protected synchronized void idle(ManagedConnectionHandle connection) {
441 connection.clearUsed();
442 if (_daemon != null) {
443 _daemon.executeAfterDelay(_idlePeriod, new IdleReaper());
444 }
445 }
446
447 /***
448 * Adds a connection to the pool. If the connection was created, a {@link
449 * ManagedConnectionHandle} will be returned, wrapping the supplied
450 * connection.
451 *
452 * @param connection the connection to add
453 * @param accepted if <code>true</code> the connection was accepted via an
454 * {@link ManagedConnectionAcceptor}, otherwise it was
455 * created via
456 * {@link ManagedConnectionFactory#createManagedConnection}
457 * @return the (possibly wrapped) connection
458 * @throws ResourceException if the connection cannot be added
459 */
460 protected ManagedConnection add(ManagedConnection connection,
461 boolean accepted) throws ResourceException {
462 ManagedConnection result;
463
464 PoolEntry entry = new PoolEntry(connection, accepted);
465 _entries.put(connection, entry);
466 if (accepted) {
467 _accepted.add(connection);
468 result = connection;
469 } else {
470 _connections.add(connection);
471 ManagedConnection handle = new ManagedConnectionHandle(
472 this, connection, _resolver);
473 _handles.put(connection, handle);
474 result = handle;
475 }
476 ContextInvocationHandler handler = new ContextInvocationHandler(
477 _handler, _resolver, result);
478 try {
479 connection.setInvocationHandler(handler);
480 connection.setConnectionEventListener(this);
481 } catch (ResourceException exception) {
482 try {
483 _log.debug("Failed to initialise connection, destroying",
484 exception);
485 connection.destroy();
486 } catch (ResourceException nested) {
487 _log.debug("Failed to destroy connection", nested);
488 } finally {
489 _entries.remove(connection);
490 if (accepted) {
491 _accepted.remove(connection);
492 } else {
493 _connections.remove(connection);
494 _handles.remove(connection);
495 }
496 }
497
498 throw exception;
499 }
500
501
502
503 entry.setInitialised();
504
505 startReaper();
506
507 return result;
508 }
509
510 /***
511 * Remove a connection from the pool.
512 *
513 * @param connection the connection to remove
514 */
515 protected void remove(ManagedConnection connection) {
516 PoolEntry entry = (PoolEntry) _entries.remove(connection);
517 if (entry != null) {
518 if (entry.getAccepted()) {
519 _accepted.remove(connection);
520 } else {
521 _connections.remove(connection);
522 _handles.remove(connection);
523 }
524 URI remoteURI = null;
525 URI localURI = null;
526 try {
527 remoteURI = connection.getRemoteURI();
528 localURI = connection.getLocalURI();
529 } catch (ResourceException exception) {
530 _log.debug("Failed to get connection URIs", exception);
531 }
532
533 try {
534 connection.destroy();
535 } catch (ResourceException exception) {
536 _log.debug("Failed to destroy connection", exception);
537 }
538 if (remoteURI != null && localURI != null) {
539 notifyDisconnection(remoteURI, localURI);
540 }
541 } else {
542 _log.debug("ManagedConnection not found");
543 }
544 if (_entries.isEmpty()) {
545 stopReaper();
546 }
547 }
548
549 /***
550 * Notify of a disconnection.
551 *
552 * @param remoteURI the remote address that the client is calling from
553 * @param localURI the local address that the client is calling to
554 */
555 private void notifyDisconnection(URI remoteURI, URI localURI) {
556 CallerListener listener = _listener;
557 if (listener != null) {
558 listener.disconnected(new CallerImpl(remoteURI, localURI));
559 }
560 }
561
562 /***
563 * Starts the reaper for dead/idle connections, if needed.
564 */
565 private synchronized void startReaper() {
566 if (_daemon == null) {
567 _daemon = new ClockDaemon();
568 ThreadFactory creator =
569 new ThreadFactory(null, "ManagedConnectionReaper", false);
570 _daemon.setThreadFactory(creator);
571
572 if (_reapInterval > 0) {
573 _daemon.executePeriodically(_reapInterval, new DeadReaper(),
574 false);
575 }
576 }
577 }
578
579 /***
580 * Stops the reaper for dead/idle connections, if needed.
581 */
582 private synchronized void stopReaper() {
583 if (_daemon != null) {
584 _daemon.shutDown();
585 _daemon = null;
586 }
587 }
588
589 /***
590 * Reap idle connections.
591 */
592 private void reapIdleConnections() {
593 Map.Entry[] entries = (Map.Entry[]) _handles.entrySet().toArray(
594 new Map.Entry[0]);
595 for (int i = 0; i < entries.length && !stopReaping(); ++i) {
596 Map.Entry entry = entries[i];
597 ManagedConnection connection =
598 (ManagedConnection) entry.getKey();
599 PoolEntry pooled = (PoolEntry) _entries.get(connection);
600 if (pooled != null && pooled.isInitialised()) {
601 ManagedConnectionHandle handle =
602 (ManagedConnectionHandle) entry.getValue();
603 if (handle.canDestroy()) {
604 if (_log.isDebugEnabled()) {
605 try {
606 _log.debug("Reaping idle connection, URI="
607 + connection.getRemoteURI()
608 + ", local URI="
609 + connection.getLocalURI());
610 } catch (ResourceException ignore) {
611
612 }
613 }
614 remove(connection);
615 }
616 }
617 }
618 }
619
620 /***
621 * Reap dead connections.
622 */
623 private void reapDeadConnections() {
624 Map.Entry[] entries = (Map.Entry[]) _handles.entrySet().toArray(
625 new Map.Entry[0]);
626 for (int i = 0; i < entries.length && !stopReaping(); ++i) {
627 Map.Entry entry = entries[i];
628 ManagedConnection connection =
629 (ManagedConnection) entry.getKey();
630 PoolEntry pooled = (PoolEntry) _entries.get(connection);
631 if (pooled != null && pooled.isInitialised()) {
632 ManagedConnectionHandle handle =
633 (ManagedConnectionHandle) entry.getValue();
634 if (!handle.used()) {
635
636
637 if (handle.pinging()) {
638 if (handle.incPingWaits() > _reapDeadIterations) {
639 remove(connection);
640 }
641 } else {
642 try {
643 handle.ping();
644 } catch (ResourceException exception) {
645 if (_log.isDebugEnabled()) {
646 try {
647 _log.debug(
648 "Failed to ping connection, URI="
649 + connection.getRemoteURI()
650 + ", localURI="
651 + connection.getLocalURI());
652 } catch (ResourceException ignore) {
653
654 }
655 }
656 remove(connection);
657 }
658 }
659 } else {
660 handle.clearUsed();
661 }
662 }
663 }
664 }
665
666 /***
667 * Helper to determines if a reaper should terminate, by checking the
668 * interrupt status of the current thread.
669 *
670 * @return <code>true</code> if the reaper should terminate
671 */
672 private boolean stopReaping() {
673 return Thread.currentThread().isInterrupted();
674 }
675
676 /***
677 * Helper class for reaping idle connections.
678 */
679 private class IdleReaper implements Runnable {
680
681 /***
682 * Run the reaper.
683 */
684 public void run() {
685 synchronized (_reapLock) {
686 try {
687 reapIdleConnections();
688 } catch (Throwable exception) {
689 _log.error(exception, exception);
690 }
691 }
692 }
693 }
694
695 /***
696 * Helper class for reaping dead connections.
697 */
698 private class DeadReaper implements Runnable {
699
700 /***
701 * Run the reaper.
702 */
703 public void run() {
704 try {
705 reapDeadConnections();
706 } catch (Throwable exception) {
707 _log.error(exception, exception);
708 }
709 }
710 }
711
712 }